package ins.framework.kafka.assignpartion;

import kafka.common.*;
import kafka.common.OffsetAndMetadata;
import kafka.javaapi.*;
import kafka.network.BlockingChannel;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.Long;
import java.util.*;
import java.util.Map.Entry;

public class Test
{
    private static Logger LOG = LoggerFactory.getLogger(Test.class);
    private static final int correlationId = 2;
    private static final String clientId = "internalConsumer";

    public static BlockingChannel createBlockingChannel(String bootstrapServers1) {
        List<String> hosts = new ArrayList();
        int port = 9092;
        BlockingChannel channel = null;
        String bootstrapServers = bootstrapServers1.replaceAll(" ", "");
        if ("" != bootstrapServers) {
            String[] hostsAndPort = bootstrapServers.split(",");
            for (int i = 0; i < hostsAndPort.length; i++) {
                String host = hostsAndPort[i].split(":")[0];
                port = Integer.parseInt(hostsAndPort[i].split(":")[1]);
                hosts.add(host);
            }

            String[] hostsArray = new String[hosts.size()];
            for (int k = 0; k < hosts.size(); k++) {
                hostsArray[k] = hosts.get(k);
            }

            for (int j = 0; (j < hostsArray.length) && ((channel == null) || (!channel.isConnected())); j++)
                try {
                    //LOG.info("###testbug001: try to create BlockingChannel in {} times", Integer.valueOf(j + 1));
                    channel = new BlockingChannel(hostsArray[j].trim(), port,
                            BlockingChannel.UseDefaultBufferSize(),
                            BlockingChannel.UseDefaultBufferSize(), 5000);
                    channel.connect();
                } catch (Exception e) {
                    LOG.info("###>:channel connect but failed with the exception {}", e.getMessage());
                }
        }
        else {
            LOG.info("###>: bootstrapServers is null, so can not create blockingChannel");
        }
        return channel;
    }
    public static Map<Integer, Long> getOffsetFromKafka(String bootstrapServers, String groupId, String topic, List<Integer> partitionsIds) {
        Map<Integer, Long> offsetMap = new HashMap();
        BlockingChannel channel = createBlockingChannel(bootstrapServers);
        if (channel.isConnected()) {
            List partitions = new ArrayList();
            for (Integer i : partitionsIds) {
                partitions.add(new TopicAndPartition(topic, i.intValue()));
            }
            OffsetFetchRequest fetchRequest = new OffsetFetchRequest(groupId, partitions,
                    (short) 1, correlationId, clientId);
            try
            {
                channel.send(fetchRequest.underlying());
                OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload());
                Map<TopicAndPartition, OffsetMetadataAndError> result = fetchResponse.offsets();
                for(Entry<TopicAndPartition, OffsetMetadataAndError> entry : result.entrySet()) {
                    TopicAndPartition topicAndPartition = entry.getKey();
                    OffsetMetadataAndError offsetMetadataAndError = entry.getValue();
                    int partition = topicAndPartition.partition();
                    long retriveOffset = offsetMetadataAndError.offset();
                    offsetMap.put(partition, retriveOffset);
                }
            } catch (Exception e) {
                LOG.warn("###>: send offsetFetchRequest with exception: {}", e.getMessage());
                e.printStackTrace();
            } finally {
                channel.disconnect();
            }
        } else {
            LOG.info("###>: BlockingChannel is not connected!");
        }
        return offsetMap;
    }

    public static Map<TopicPartition, Long> getOffsetFromKafkaByTopicAndMetadata(String bootstrapServers, String groupId,
                                                                                 Set<TopicAndPartition> topicPartitions) {
        Map<TopicPartition, Long> topicPartitionLongMap = new HashMap<>();
        BlockingChannel channel = createBlockingChannel(bootstrapServers);
        if (channel.isConnected()) {
            List partitions = new ArrayList();
            partitions.addAll(topicPartitions);
            OffsetFetchRequest fetchRequest = new OffsetFetchRequest(groupId, partitions,
                    (short) 1, correlationId, clientId);
            try
            {
                channel.send(fetchRequest.underlying());
                OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload());
                Map<TopicAndPartition, OffsetMetadataAndError> result = fetchResponse.offsets();
                for(Entry<TopicAndPartition, OffsetMetadataAndError> entry : result.entrySet()) {
                    TopicAndPartition topicAndPartition = entry.getKey();
                    TopicPartition topicPartition = new TopicPartition(topicAndPartition.topic(), topicAndPartition.partition());
                    OffsetMetadataAndError offsetMetadataAndError = entry.getValue();
                    long retriveOffset = offsetMetadataAndError.offset();
                    topicPartitionLongMap.put(topicPartition, retriveOffset);
                }
            } catch (Exception e) {
                LOG.warn("###>: send offsetFetchRequest with exception: {}", e.getMessage());
                e.printStackTrace();
            } finally {
                channel.disconnect();
            }
        } else {
            LOG.info("###>: BlockingChannel is not connected!");
        }
        return topicPartitionLongMap;
    }

    public static Map<Integer, Long> getOffsetFromKafkaByPartitionNum(String bootstrapServers, String groupId,
                                                                      String topic, int partitionsNum) {
        Map<Integer, Long> offsetMap = new HashMap();
        BlockingChannel channel = createBlockingChannel(bootstrapServers);
        if (channel.isConnected()) {
            List partitions = new ArrayList();
            for (int i = 0; i < partitionsNum; i++) {
                partitions.add(new TopicAndPartition(topic, i));
            }
            OffsetFetchRequest fetchRequest = new OffsetFetchRequest(groupId, partitions,
                    (short) 1, correlationId, clientId);
            try
            {
                channel.send(fetchRequest.underlying());
                OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload());
                Map<TopicAndPartition, OffsetMetadataAndError> result = fetchResponse.offsets();
                for(Entry<TopicAndPartition, OffsetMetadataAndError> entry : result.entrySet()) {
                    TopicAndPartition topicAndPartition = entry.getKey();
                    OffsetMetadataAndError offsetMetadataAndError = entry.getValue();
                    int partition = topicAndPartition.partition();
                    long retriveOffset = offsetMetadataAndError.offset();
                    offsetMap.put(partition, retriveOffset);
                }
            } catch (Exception e) {
                LOG.warn("###>: send offsetFetchRequest with exception: {}", e.getMessage());
                e.printStackTrace();
            } finally {
                channel.disconnect();
            }
        } else {
            LOG.info("###>: BlockingChannel is not connected!");
        }
        return offsetMap;
    }

    public static void commitOffsetToKafka(String bootstrapServers, String groupId, Map<TopicAndPartition, OffsetAndMetadata> offsets) {
        BlockingChannel channel = createBlockingChannel(bootstrapServers);
        if (channel.isConnected()) {
            OffsetCommitRequest commitRequest = new OffsetCommitRequest(groupId, offsets, correlationId, clientId, (short) 1);
            try {
                LOG.debug("###testbug: begin to send OffsetCommitRequest");
                channel.send(commitRequest.underlying());
                OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().payload());
//                if (commitResponse.hasError()) {
//                    Map<TopicAndPartition, Object> result = commitResponse.errors();
//                    for (Entry<TopicAndPartition, Object> entry : result.entrySet()) {
//                        if(entry.getValue() == ErrorMapping.OffsetMetadataTooLargeCode()) {
//
//                        }else if (entry.getValue() == ErrorMapping.NotCoordinatorForConsumerCode() ||
//                                entry.getValue() == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) {
//
//                            // Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager
//                        } else {
//                            // log and retry the commit
//                        }
//                    }
//                }
                channel.disconnect();
            }
            catch (Exception e)
            {
                LOG.info("###>: commit offset request failed with exception {}", e.getMessage());
            }
        } else {
            LOG.info("###>: BlockingChannel is not connected!");
        }
    }

    public static Map<TopicAndPartition, OffsetAndMetadata> convertToCommon(Map<TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets) {
        Map<TopicAndPartition, OffsetAndMetadata> convertedOffsets = new HashMap<>();
        for(Map.Entry<TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> offset : offsets.entrySet()) {
            TopicAndPartition topicAndPartition = new TopicAndPartition(offset.getKey().topic(), offset.getKey().partition());
            OffsetMetadata offsetMetadata = new OffsetMetadata(offset.getValue().offset(), Integer.toString(offset.getKey().partition()));
            OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offsetMetadata, System.currentTimeMillis(), System.currentTimeMillis() + 6*3600*3600);
            convertedOffsets.put(topicAndPartition, offsetAndMetadata);
        }
        return convertedOffsets;
    }


    public static void main(String[] args)
    {
        Map<TopicAndPartition, OffsetAndMetadata> offsets = new HashMap<>();
        for (int i = 0; i < 1; i++) {
            TopicAndPartition topicAndPartition = new TopicAndPartition("Integer_Integer", i);
            Long offset1 = 80L;
            String metadata = Integer.toString(i);
            OffsetMetadata offsetMetadata = new OffsetMetadata(offset1, metadata);
            OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offsetMetadata, System.currentTimeMillis(), System.currentTimeMillis() + 2*60*60*1000);

            offsets.put(topicAndPartition, offsetAndMetadata);
        }
        commitOffsetToKafka("10.10.1.7:9092", "connectors-lsh-008", offsets);

        /*Map<Integer, Long> test = KafkaUtilsV2.getOffsetFromKafkaByPartitionNum("lark001:9092", "hik_mac_info", "hik_mac_info", "test", 10);
        for(Entry<Integer, Long> entry : test.entrySet()) {
            Integer key = entry.getKey();
            Long value = entry.getValue();
            LOG.info("###testbug: key = {},and value = {}", key, value);
        }*/

        Map<TopicAndPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> topicPartitions = new HashMap<>();
        TopicAndPartition topicPartition = new TopicAndPartition("Integer_Integer", 0);
        Set<TopicAndPartition> topicAndPartitionSet = new HashSet<>();
        topicAndPartitionSet.add(topicPartition);
        org.apache.kafka.clients.consumer.OffsetAndMetadata offsetAndMetadata = new org.apache.kafka.clients.consumer.OffsetAndMetadata(0);
        topicPartitions.put(topicPartition, offsetAndMetadata);
        //Map<Integer, Long> offsets = KafkaUtilsV2.getOffsetFromKafkaByPartitionNum("hdh153:9092", "connectors-lsh-008", "fieldcompact02", 1);
        Map<TopicPartition, Long> offsetss = Test.getOffsetFromKafkaByTopicAndMetadata("10.10.1.7:9092", "connectors-lsh-018", topicAndPartitionSet);
        for(Map.Entry<TopicPartition, Long> offset : offsetss.entrySet()) {
            //System.out.println("###test: topic = " + offset.getKey().topic() + ";partition = " + offset.getKey().partition() + "; offset = " + offset.getValue());
            LOG.info("###> partition = {}, offset = {}", offset.getKey().partition(), offset.getValue());
        }
    }
}